// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= Ignite DataFrame

== Overview

The Apache Spark DataFrame API introduced the concept of a schema to describe the data, allowing Spark to manage the schema and organize the data into a tabular format. To put it simply, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database and allows Spark to leverage the Catalyst query optimizer to produce much more efficient query execution plans in comparison to RDDs, which are just collections of elements partitioned across the nodes of the cluster.

Ignite expands DataFrame, simplifying development and improving data access times whenever Ignite is used as memory-centric storage for Spark. Benefits include:

* Ability to share data and state across Spark jobs by writing and reading DataFrames to/from Ignite.
* Faster SparkSQL queries by optimizing Spark query execution plans with Ignite SQL engine which include​ advanced indexing and avoid data movement across the network from Ignite to Spark.

== Integration

`IgniteRelationProvider` is an implementation of the Spark `RelationProvider` and `CreatableRelationProvider` interfaces. The `IgniteRelationProvider` can talk directly to Ignite tables through the Spark SQL interface. The data are loaded and exchanged via `IgniteSQLRelation` that executes filtering operations on the Ignite side. For now, grouping, joining or ordering operations are fulfilled on the Spark side. These operations will be optimized and processed on the Ignite side in link:https://issues.apache.org/jira/browse/IGNITE-7077[upcoming releases^]. `IgniteSQLRelation` utilizes the partitioned nature of Ignite's architecture and provides partitioning information to Spark.

== Spark Session

To use the Apache Spark DataFrame API, it is necessary to create an entry point for programming with Spark. This is achieved through the use of a `SparkSession` object, as shown in the following example:

[tabs]
--
tab:Java[]
[source, java]
----
// Creating spark session.
SparkSession spark = SparkSession.builder()
  .appName("Example Program")
  .master("local")
  .config("spark.executor.instances", "2")
  .getOrCreate();
----

tab:Scala[]
[source, scala]
----
// Creating spark session.
implicit val spark = SparkSession.builder()
  .appName("Example Program")
  .master("local")
  .config("spark.executor.instances", "2")
  .getOrCreate()
----
--

== Reading DataFrames

In order to read data from Ignite, you need to specify its format and the path to the Ignite configuration file. For example, assume an Ignite table named ‘person’ is created and deployed in Ignite, as follows:


[source, sql]
----
CREATE TABLE person (
    id LONG,
    name VARCHAR,
    city_id LONG,
    PRIMARY KEY (id, city_id)
) WITH "backups=1, affinityKey=city_id”;
----

The following Spark code can find all the rows from the 'person' table where the name is ‘Mary Major’:

[tabs]
--

tab:Java[]

[source, java]
----
SparkSession spark = ...
String cfgPath = "path/to/config/file";

Dataset<Row> df = spark.read()
  .format(IgniteDataFrameSettings.FORMAT_IGNITE())              //Data source
  .option(IgniteDataFrameSettings.OPTION_TABLE(), "person")     //Table to read.
  .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG) //Ignite config.
  .load();

df.createOrReplaceTempView("person");

Dataset<Row> igniteDF = spark.sql(
  "SELECT * FROM person WHERE name = 'Mary Major'");
----


tab:Scala[]

[source, scala]
----
val spark: SparkSession = …
val cfgPath: String = "path/to/config/file"

val df = spark.read
  .format(FORMAT_IGNITE)               // Data source type.
  .option(OPTION_TABLE, "person")      // Table to read.
  .option(OPTION_CONFIG_FILE, cfgPath) // Ignite config.
  .load()

df.createOrReplaceTempView("person")

val igniteDF = spark.sql("SELECT * FROM person WHERE name = 'Mary Major'")
----
--



== Saving DataFrames

[NOTE]
====
[discrete]
=== Implementation notes
Internally all inserts are done through `IgniteDataStreamer`. Several optional parameters exist to configure the internal streamer. Please, see a <<Ignite DataFrame Options>> of available options.
====


Ignite can serve as a storage for DataFrames created or updated in Spark. The following save modes determine how a DataFrame is processed in Ignite:

* `Append` - the DataFrame will be appended to an existing table. Set `OPTION_STREAMER_ALLOW_OVERWRITE=true` if you want to update existing entries with the data of the DataFrame.
* `Overwrite` - the following steps will be executed:
* If the table already exists in Ignite, it will be dropped.
* A new table will be created using the schema of the DataFrame and provided options.
* DataFrame content will be inserted into the new table.
* `ErrorIfExists` (default) - an exception is thrown if the table already exists in Ignite. If a table does not exist:
* A new table will be created using the schema of the DataFrame and provided options.
* DataFrame content will be inserted into the new table.
* `Ignore` - the operation is ignored if the table already exists in Ignite. If a table does not exist:
* A new table will be created using the schema of the DataFrame and provided options.
* DataFrame content will be inserted into the new table.

Save mode can be specified using the `mode(SaveMode mode)` method. For more information, please see the link:https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.DataFrameWriter@mode&lpar;saveMode:org.apache.spark.sql.SaveMode&rpar;:org.apache.spark.sql.DataFrameWriter%5BT%5D[Spark Documentation^]). Here is a code example that shows this method:


[tabs]
--
tab:Java[]

[source, java]
----
SparkSession spark = ...

String cfgPath = "path/to/config/file";

Dataset<Row> jsonDataFrame = spark.read().json("path/to/file.json");

jsonDataFrame.write()
  .format(IgniteDataFrameSettings.FORMAT_IGNITE())
  .mode(SaveMode.Append) // SaveMode.
//... other options
   .save();
----

tab:Scala[]

[source, scala]
----
val spark: SparkSession = …

val cfgPath: String = "path/to/config/file"

val jsonDataFrame = spark.read.json("path/to/file.json")

jsonDataFrame.write
  .format(FORMAT_IGNITE)
  .mode(SaveMode.Append) // SaveMode.
//... other options
  .save()
----
--

You must define the following Ignite specific options if a new table will be created by a DataFrame's save routines:

* `OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS` - a primary key is required for every Ignite table. This option has to contain a comma-separated list of fields/columns that represent a primary key.
* `OPTION_CREATE_TABLE_PARAMETERS` - additional parameters to use upon Ignite table creation. The parameters are those that are supported by the link:sql-reference/ddl#create-table[CREATE TABLE] command.

The following example shows how to write the content of a JSON file into Ignite:

[tabs]
--
tab:Java[]

[source, java]
----
SparkSession spark = ...

String cfgPath = "path/to/config/file";

Dataset<Row> jsonDataFrame = spark.read().json("path/to/file.json");

jsonDataFrame.write()
  .format(IgniteDataFrameSettings.FORMAT_IGNITE())
  .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), TEST_CONFIG_FILE)
  .option(IgniteDataFrameSettings.OPTION_TABLE(), "json_table")
  .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id")
  .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated")
  .save();
----

tab:Scala[]

[source, scala]
----
val spark: SparkSession = …

val cfgPath: String = "path/to/config/file"

val jsonDataFrame = spark.read.json("path/to/file.json")

jsonDataFrame.write
  .format(FORMAT_IGNITE)
  .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
  .option(OPTION_TABLE, "json_table")
  .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
  .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
  .save()
----

--

== IgniteSparkSession and IgniteExternalCatalog

Spark introduces the entity called `catalog` to read and store meta-information about known data sources, such as tables and views. Ignite provides its own implementation of this catalog, called `IgniteExternalCatalog`.

`IgniteExternalCatalog` can read information about all existing SQL tables deployed in the Ignite cluster. `IgniteExternalCatalog` is also required to build an `IgniteSparkSession` object.

`IgniteSparkSession` is an extension of the regular `SparkSession` that stores `IgniteContext` and injects the `IgniteExternalCatalog` instance into Spark objects.

`IgniteSparkSession.builder()` must be used to create `IgniteSparkSession`. For example, if the following two tables are created in Ignite:



[source, sql]
----
CREATE TABLE city (
    id LONG PRIMARY KEY,
    name VARCHAR
) WITH "template=replicated";

CREATE TABLE person (
    id LONG,
    name VARCHAR,
    city_id LONG,
    PRIMARY KEY (id, city_id)
) WITH "backups=1, affinityKey=city_id";
----


Then executing the following code provides table meta-information:


[tabs]
--
tab:Java[]

[source, java]
----
// Using SparkBuilder provided by Ignite.
IgniteSparkSession igniteSession = IgniteSparkSession.builder()
  .appName("Spark Ignite catalog example")
  .master("local")
  .config("spark.executor.instances", "2")
  //Only additional option to refer to Ignite cluster.
  .igniteConfig("/path/to/ignite/config.xml")
  .getOrCreate();

// This will print out info about all SQL tables existed in Ignite.
igniteSession.catalog().listTables().show();

// This will print out schema of PERSON table.
igniteSession.catalog().listColumns("person").show();

// This will print out schema of CITY table.
igniteSession.catalog().listColumns("city").show();
----


tab:Scala[]

[source, scala]
----
// Using SparkBuilder provided by Ignite.
val igniteSession = IgniteSparkSession.builder()
  .appName("Spark Ignite catalog example")
  .master("local")
  .config("spark.executor.instances", "2")
  //Only additional option to refer to Ignite cluster.
  .igniteConfig("/path/to/ignite/config.xml")
  .getOrCreate()

// This will print out info about all SQL tables existed in Ignite.
igniteSession.catalog.listTables().show()

// This will print out schema of PERSON table.
igniteSession.catalog.listColumns("person").show()

// This will print out schema of CITY table.
igniteSession.catalog.listColumns("city").show()
----
--

And the code output should be similar to the following:



[source, text]
----
+------+--------+-----------+---------+-----------+
|  name|database|description|tableType|isTemporary|
+------+--------+-----------+---------+-----------+
|  CITY|        |       null| EXTERNAL|      false|
|PERSON|        |       null| EXTERNAL|      false|
+------+--------+-----------+---------+-----------+

PERSON table description:

+-------+-----------+--------+--------+-----------+--------+
|   name|description|dataType|nullable|isPartition|isBucket|
+-------+-----------+--------+--------+-----------+--------+
|   NAME|       null|  string|    true|      false|   false|
|     ID|       null|  bigint|   false|       true|   false|
|CITY_ID|       null|  bigint|   false|       true|   false|
+-------+-----------+--------+--------+-----------+--------+

CITY table description:

+----+-----------+--------+--------+-----------+--------+
|name|description|dataType|nullable|isPartition|isBucket|
+----+-----------+--------+--------+-----------+--------+
|NAME|       null|  string|    true|      false|   false|
|  ID|       null|  bigint|   false|       true|   false|
+----+-----------+--------+--------+-----------+--------+
----







== Ignite DataFrame Options


[cols="1,2",opts="header"]
|===
| Name  | Description
| `FORMAT_IGNITE`|   Name of the Ignite Data Source
|`OPTION_CONFIG_FILE` | Path to the config file
|`OPTION_TABLE`   | Table name
|`OPTION_CREATE_TABLE_PARAMETERS` | Additional parameters for a newly created table. The value of this option is used for the `WITH` part of a `CREATE TABLE` query.
|`OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS`|  Comma separated list of primary key fields.
|`OPTION_STREAMER_ALLOW_OVERWRITE` |If `true`, then an existing row will be overwritten with DataFrame content. If `false`, then the row will be skipped if the primary key already exists in the table.
|`OPTION_STREAMER_FLUSH_FREQUENCY`| Automatic flush frequency. This is the time after which the streamer will make an attempt to submit all data added so far to remote nodes See link:data-streaming[Data Streaming]
|`OPTION_STREAMER_PER_NODE_BUFFER_SIZE`|    Per node buffer size. See also. The size of the per node key-value pairs buffer.
|`OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS`|    Per node buffer size. The maximum number of parallel stream operations for a single node.
|`OPTION_SCHEMA`|   The Ignite SQL schema name in which the specified table exists. When OPTION_SCHEMA is not specified, all schemas will be scanned to find a table with a matching name. This option can be used to differentiate two tables of the same name in different Ignite SQL schemas.

When creating new tables, `OPTION_SCHEMA` must be specified as `PUBLIC`, otherwise an exception will be thrown because currently Ignite SQL can issue `CREATE TABLE` statements within the `PUBLIC` schema only.

|===

== Examples

There are several examples available on GitHub that demonstrate how to use Spark DataFrames with Ignite:

* link:{githubUrl}/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameExample.scala[DataFrame]
* link:{githubUrl}/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameWriteExample.scala[Saving DataFrame]
* link:{githubUrl}/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteCatalogExample.scala[Catalog]
